package ct.analysis.tool;

import ct.common.constant.Names;
import ct.analysis.io.MySQLBeanOutputFormat;
import ct.analysis.mapper.AnalysisBeanMapper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.util.Tool;
import ct.analysis.reducer.AnalysisBeanReducer;

/**
 * @Author Jeremy Zheng
 * @Date 2021/2/26 16:51
 * @Version 1.0
 * 分析数据的工具类
 */
public class AnalysisTextTool implements Tool {
    @Override
    public int run(String[] strings) throws Exception {

        Job job = Job.getInstance();
        job.setJarByClass(AnalysisTextTool.class);

        Scan scan =new Scan();
        //只要一半的数据（主叫），避免重复
        scan.addFamily(Bytes.toBytes(Names.CF_CALLER.getValue()));

        //mapper
        TableMapReduceUtil.initTableMapperJob(
                Names.TABLE.getValue(),
                scan,
                AnalysisBeanMapper.class,
                Text.class,
                Text.class,
                job
        );

        //reducer
        job.setReducerClass(AnalysisBeanReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        //outputformat
        job.setOutputFormatClass(MySQLBeanOutputFormat.class);
        boolean flg = job.waitForCompletion(true);

        if (flg){
            return JobStatus.State.SUCCEEDED.getValue();
        }else {
            return JobStatus.State.FAILED.getValue();
        }

//        return 0;
    }

    @Override
    public void setConf(Configuration configuration) {

    }

    @Override
    public Configuration getConf() {
        return null;
    }
}
